/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.tests.util.kafka;

import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.test.util.FileUtils;
import org.apache.flink.tests.util.AutoClosableProcess;
import org.apache.flink.tests.util.CommandLineWrapper;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.tests.util.activation.OperatingSystemRestriction;
import org.apache.flink.tests.util.cache.DownloadCache;
import org.apache.flink.util.OperatingSystem;

import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * {@link KafkaResource} that downloads kafka and sets up a local kafka cluster with the bundled
 * zookeeper.
 */
public class LocalStandaloneKafkaResource implements KafkaResource {

    private static final Logger LOG = LoggerFactory.getLogger(LocalStandaloneKafkaResource.class);
    private static final Pattern ZK_DATA_DIR_PATTERN = Pattern.compile(".*(dataDir=).*");
    private static final Pattern KAFKA_LOG_DIR_PATTERN = Pattern.compile(".*(log\\.dirs=).*");

    private static final String ZOOKEEPER_HOST = "localhost";
    private static final int ZOOKEEPER_PORT = 2181;
    private static final String ZOOKEEPER_ADDRESS = ZOOKEEPER_HOST + ':' + ZOOKEEPER_PORT;
    private static final String KAFKA_HOST = "localhost";
    private static final int KAFKA_PORT = 9092;
    private static final String KAFKA_ADDRESS = KAFKA_HOST + ':' + KAFKA_PORT;

    private final TemporaryFolder tmp = new TemporaryFolder();

    private final DownloadCache downloadCache = DownloadCache.get();
    private final String kafkaVersion;
    private Path kafkaDir;
    @Nullable private Path logBackupDirectory;

    LocalStandaloneKafkaResource(final String kafkaVersion, @Nullable Path logBackupDirectory) {
        OperatingSystemRestriction.forbid(
                String.format(
                        "The %s relies on UNIX utils and shell scripts.",
                        getClass().getSimpleName()),
                OperatingSystem.WINDOWS);
        this.kafkaVersion = kafkaVersion;
        this.logBackupDirectory = logBackupDirectory;
    }

    private static String getKafkaDownloadUrl(final String kafkaVersion) {
        return String.format(
                "https://archive.apache.org/dist/kafka/%s/kafka_2.11-%s.tgz",
                kafkaVersion, kafkaVersion);
    }

    @Override
    public void before() throws Exception {
        tmp.create();
        downloadCache.before();

        this.kafkaDir = tmp.newFolder("kafka").toPath().toAbsolutePath();
        setupKafkaDist();
        setupKafkaCluster();
    }

    private void setupKafkaDist() throws IOException {
        final Path downloadDirectory = tmp.newFolder("getOrDownload").toPath();
        final Path kafkaArchive =
                downloadCache.getOrDownload(getKafkaDownloadUrl(kafkaVersion), downloadDirectory);

        LOG.info("Kafka location: {}", kafkaDir.toAbsolutePath());
        AutoClosableProcess.runBlocking(
                CommandLineWrapper.tar(kafkaArchive)
                        .extract()
                        .zipped()
                        .strip(1)
                        .targetDir(kafkaDir)
                        .build());

        LOG.info("Updating ZooKeeper properties");
        FileUtils.replace(
                kafkaDir.resolve(Paths.get("config", "zookeeper.properties")),
                ZK_DATA_DIR_PATTERN,
                matcher ->
                        matcher.replaceAll("$1" + kafkaDir.resolve("zookeeper").toAbsolutePath()));

        LOG.info("Updating Kafka properties");
        FileUtils.replace(
                kafkaDir.resolve(Paths.get("config", "server.properties")),
                KAFKA_LOG_DIR_PATTERN,
                matcher -> matcher.replaceAll("$1" + kafkaDir.resolve("kafka").toAbsolutePath()));
    }

    private void setupKafkaCluster() throws IOException {
        LOG.info("Starting zookeeper");
        AutoClosableProcess.runBlocking(
                kafkaDir.resolve(Paths.get("bin", "zookeeper-server-start.sh")).toString(),
                "-daemon",
                kafkaDir.resolve(Paths.get("config", "zookeeper.properties")).toString());
        LOG.info("Starting kafka");
        AutoClosableProcess.runBlocking(
                kafkaDir.resolve(Paths.get("bin", "kafka-server-start.sh")).toString(),
                "-daemon",
                kafkaDir.resolve(Paths.get("config", "server.properties")).toString());

        while (!isZookeeperRunning(kafkaDir)) {
            try {
                LOG.info("Waiting for ZooKeeper to start.");
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
        while (!isKafkaRunning(kafkaDir)) {
            try {
                LOG.info("Waiting for Kafka to start.");
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }

    @Override
    public void afterTestSuccess() {
        shutdownResource();
        downloadCache.afterTestSuccess();
        tmp.delete();
    }

    @Override
    public void afterTestFailure() {
        shutdownResource();
        backupLogs();
        downloadCache.afterTestFailure();
        tmp.delete();
    }

    private void shutdownResource() {
        try {
            AutoClosableProcess.runBlocking(
                    kafkaDir.resolve(Paths.get("bin", "kafka-server-stop.sh")).toString());
            while (isKafkaRunning(kafkaDir)) {
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        } catch (IOException ioe) {
            LOG.warn("Error while shutting down kafka.", ioe);
        }
        try {
            AutoClosableProcess.runBlocking(
                    kafkaDir.resolve(Paths.get("bin", "zookeeper-server-stop.sh")).toString());
            while (isZookeeperRunning(kafkaDir)) {
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        } catch (IOException ioe) {
            LOG.warn("Error while shutting down zookeeper.", ioe);
        }
    }

    private void backupLogs() {
        if (logBackupDirectory != null) {
            final Path targetDirectory =
                    logBackupDirectory.resolve("kafka-" + UUID.randomUUID().toString());
            try {
                Files.createDirectories(targetDirectory);
                TestUtils.copyDirectory(kafkaDir.resolve("logs"), targetDirectory);
                LOG.info("Backed up logs to {}.", targetDirectory);
            } catch (IOException e) {
                LOG.warn("An error has occurred while backing up logs to {}.", targetDirectory, e);
            }
        }
    }

    private static boolean isZookeeperRunning(final Path kafkaDir) {
        try {
            queryBrokerStatus(kafkaDir, line -> {});
            return true;
        } catch (final IOException ioe) {
            // we get an exception if zookeeper isn't running
            return false;
        }
    }

    private static boolean isKafkaRunning(final Path kafkaDir) throws IOException {
        try {
            final AtomicBoolean atomicBrokerStarted = new AtomicBoolean(false);
            queryBrokerStatus(
                    kafkaDir,
                    line -> {
                        atomicBrokerStarted.compareAndSet(false, line.contains("\"port\":"));
                    });
            return atomicBrokerStarted.get();
        } catch (final IOException ioe) {
            // we get an exception if zookeeper isn't running
            return false;
        }
    }

    private static void queryBrokerStatus(
            final Path kafkaDir, final Consumer<String> stderrProcessor) throws IOException {
        AutoClosableProcess.create(
                        kafkaDir.resolve(Paths.get("bin", "zookeeper-shell.sh")).toString(),
                        ZOOKEEPER_ADDRESS,
                        "get",
                        "/brokers/ids/0")
                .setStdoutProcessor(stderrProcessor)
                .runBlocking();
    }

    @Override
    public void createTopic(int replicationFactor, int numPartitions, String topic)
            throws IOException {
        AutoClosableProcess.runBlocking(
                kafkaDir.resolve(Paths.get("bin", "kafka-topics.sh")).toString(),
                "--create",
                "--zookeeper",
                ZOOKEEPER_ADDRESS,
                "--replication-factor",
                String.valueOf(replicationFactor),
                "--partitions",
                String.valueOf(numPartitions),
                "--topic",
                topic);
    }

    @Override
    public void sendMessages(String topic, String... messages) throws IOException {
        List<String> args = createSendMessageArguments(topic);
        sendMessagesAndWait(args, messages);
    }

    @Override
    public void sendKeyedMessages(String topic, String keySeparator, String... messages)
            throws IOException {
        List<String> args = new ArrayList<>(createSendMessageArguments(topic));
        args.add("--property");
        args.add("parse.key=true");
        args.add("--property");
        args.add("key.separator=" + keySeparator);

        sendMessagesAndWait(args, messages);
    }

    private List<String> createSendMessageArguments(String topic) {
        return Arrays.asList(
                kafkaDir.resolve(Paths.get("bin", "kafka-console-producer.sh")).toString(),
                "--broker-list",
                KAFKA_ADDRESS,
                "--topic",
                topic);
    }

    private void sendMessagesAndWait(List<String> kafkaArgs, String... messages)
            throws IOException {
        try (AutoClosableProcess autoClosableProcess =
                AutoClosableProcess.runNonBlocking(kafkaArgs.toArray(new String[0]))) {

            try (PrintStream printStream =
                    new PrintStream(
                            autoClosableProcess.getProcess().getOutputStream(),
                            true,
                            StandardCharsets.UTF_8.name())) {
                for (final String message : messages) {
                    printStream.println(message);
                }
                printStream.flush();
            }

            try {
                // wait until the process shuts down on it's own
                // this is the only reliable way to ensure the producer has actually processed our
                // input
                autoClosableProcess.getProcess().waitFor();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override
    public List<String> readMessage(int expectedNumMessages, String groupId, String topic)
            throws IOException {
        final List<String> messages =
                Collections.synchronizedList(new ArrayList<>(expectedNumMessages));

        try (final AutoClosableProcess kafka =
                AutoClosableProcess.create(
                                kafkaDir.resolve(Paths.get("bin", "kafka-console-consumer.sh"))
                                        .toString(),
                                "--bootstrap-server",
                                KAFKA_ADDRESS,
                                "--from-beginning",
                                "--max-messages",
                                String.valueOf(expectedNumMessages),
                                "--topic",
                                topic,
                                "--consumer-property",
                                "group.id=" + groupId)
                        .setStdoutProcessor(messages::add)
                        .runNonBlocking()) {

            final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(120));
            while (deadline.hasTimeLeft() && messages.size() < expectedNumMessages) {
                try {
                    LOG.info(
                            "Waiting for messages. Received {}/{}.",
                            messages.size(),
                            expectedNumMessages);
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
            if (messages.size() != expectedNumMessages) {
                throw new IOException("Could not read expected number of messages.");
            }
            return messages;
        }
    }

    @Override
    public void setNumPartitions(int numPartitions, String topic) throws IOException {
        AutoClosableProcess.runBlocking(
                kafkaDir.resolve(Paths.get("bin", "kafka-topics.sh")).toString(),
                "--alter",
                "--topic",
                topic,
                "--partitions",
                String.valueOf(numPartitions),
                "--zookeeper",
                ZOOKEEPER_ADDRESS);
    }

    @Override
    public int getNumPartitions(String topic) throws IOException {
        final Pattern partitionCountPattern = Pattern.compile(".*PartitionCount:\\s*([0-9]+).*");
        final AtomicReference<Integer> partitionCountFound = new AtomicReference<>(-1);
        AutoClosableProcess.create(
                        kafkaDir.resolve(Paths.get("bin", "kafka-topics.sh")).toString(),
                        "--describe",
                        "--topic",
                        topic,
                        "--zookeeper",
                        ZOOKEEPER_ADDRESS)
                .setStdoutProcessor(
                        line -> {
                            final Matcher matcher = partitionCountPattern.matcher(line);
                            if (matcher.matches()) {
                                partitionCountFound.compareAndSet(
                                        -1, Integer.parseInt(matcher.group(1)));
                            }
                        })
                .runBlocking();
        return partitionCountFound.get();
    }

    @Override
    public long getPartitionOffset(String topic, int partition) throws IOException {
        final Pattern partitionOffsetPattern = Pattern.compile(".*:.*:([0-9]+)");
        final AtomicReference<Integer> partitionOffsetFound = new AtomicReference<>(-1);
        AutoClosableProcess.create(
                        kafkaDir.resolve(Paths.get("bin", "kafka-run-class.sh")).toString(),
                        "kafka.tools.GetOffsetShell",
                        "--broker-list",
                        KAFKA_ADDRESS,
                        "--topic",
                        topic,
                        "--partitions",
                        String.valueOf(partition),
                        "--time",
                        "-1")
                .setStdoutProcessor(
                        line -> {
                            final Matcher matcher = partitionOffsetPattern.matcher(line);
                            if (matcher.matches()) {
                                partitionOffsetFound.compareAndSet(
                                        -1, Integer.parseInt(matcher.group(1)));
                            }
                        })
                .runBlocking();

        final int partitionOffset = partitionOffsetFound.get();
        if (partitionOffset == -1) {
            throw new IOException("Could not determine partition offset.");
        }
        return partitionOffset;
    }

    @Override
    public Collection<InetSocketAddress> getBootstrapServerAddresses() {
        return Collections.singletonList(
                InetSocketAddress.createUnresolved(KAFKA_HOST, KAFKA_PORT));
    }

    @Override
    public InetSocketAddress getZookeeperAddress() {
        return InetSocketAddress.createUnresolved(KAFKA_HOST, KAFKA_PORT);
    }
}
